Skip to content

Conversation

NathanFlurry
Copy link
Member

Fixes RVT-5129
Fixes RVT-5118

Copy link

linear bot commented Sep 5, 2025

Copy link

vercel bot commented Sep 5, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivet-site Ready Ready Preview Comment Sep 9, 2025 3:46am
1 Skipped Deployment
Project Deployment Preview Comments Updated (UTC)
rivet-studio Ignored Ignored Preview Sep 9, 2025 3:46am

Copy link
Member Author

NathanFlurry commented Sep 5, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link

claude bot commented Sep 5, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

pkg-pr-new bot commented Sep 5, 2025

Open in StackBlitz

npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@2874
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@2874
npm i https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@2874

commit: a294c2e

Comment on lines +365 to +391
async fn next(&mut self) -> Result<DriverOutput> {
loop {
tokio::select! {
biased;
// Prefer local messages to reduce latency
res = self.local_rx.recv() => {
match res {
std::result::Result::Ok(payload) => {
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload });
}
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => {
// Skip lagged and continue
continue;
}
std::result::Result::Err(broadcast::error::RecvError::Closed) => {
// Local channel closed; fall back to driver only
// Replace with a closed receiver to avoid busy loop
// We simply continue and rely on driver
}
}
}
res = self.driver.next() => {
return res;
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a potential issue in the error handling for broadcast::error::RecvError::Closed. The comment indicates that the code should replace the receiver with a closed one to avoid a busy loop, but the implementation doesn't actually do this. When the local channel is closed, the code continues the loop without modifying self.local_rx, which means it will repeatedly hit the same closed channel error.

Consider replacing self.local_rx with a permanently closed receiver when this error is encountered, perhaps by creating a new closed channel:

Err(broadcast::error::RecvError::Closed) => {
    // Replace with a permanently closed receiver to avoid busy loop
    let (tx, rx) = broadcast::channel::<Vec<u8>>(1);
    drop(tx);  // Close the channel
    self.local_rx = rx;
    continue;
}
Suggested change
async fn next(&mut self) -> Result<DriverOutput> {
loop {
tokio::select! {
biased;
// Prefer local messages to reduce latency
res = self.local_rx.recv() => {
match res {
std::result::Result::Ok(payload) => {
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload });
}
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => {
// Skip lagged and continue
continue;
}
std::result::Result::Err(broadcast::error::RecvError::Closed) => {
// Local channel closed; fall back to driver only
// Replace with a closed receiver to avoid busy loop
// We simply continue and rely on driver
}
}
}
res = self.driver.next() => {
return res;
}
}
}
}
async fn next(&mut self) -> Result<DriverOutput> {
loop {
tokio::select! {
biased;
// Prefer local messages to reduce latency
res = self.local_rx.recv() => {
match res {
std::result::Result::Ok(payload) => {
return Ok(DriverOutput::Message { subject: self.subject.clone(), payload });
}
std::result::Result::Err(broadcast::error::RecvError::Lagged(_)) => {
// Skip lagged and continue
continue;
}
std::result::Result::Err(broadcast::error::RecvError::Closed) => {
// Local channel closed; fall back to driver only
// Replace with a closed receiver to avoid busy loop
let (tx, rx) = broadcast::channel::<Vec<u8>>(1);
drop(tx); // Close the channel
self.local_rx = rx;
continue;
}
}
}
res = self.driver.next() => {
return res;
}
}
}
}

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

@NathanFlurry NathanFlurry force-pushed the 09-05-chore_ups_add_message_chunking_ups_protocol branch from 9d59211 to bf98d22 Compare September 8, 2025 05:12
@NathanFlurry NathanFlurry force-pushed the 09-04-chore_runner_connect_tunnel_before_connecting_pb_ws branch from cf0534f to 90ce950 Compare September 8, 2025 05:12
Copy link

claude bot commented Sep 8, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link

claude bot commented Sep 8, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

@NathanFlurry NathanFlurry force-pushed the 09-05-chore_ups_add_message_chunking_ups_protocol branch from bf98d22 to c9cff29 Compare September 9, 2025 03:30
Copy link

claude bot commented Sep 9, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

@NathanFlurry NathanFlurry force-pushed the 09-05-chore_ups_add_message_chunking_ups_protocol branch from c9cff29 to a294c2e Compare September 9, 2025 03:38
Copy link

claude bot commented Sep 9, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

Copy link
Contributor

graphite-app bot commented Sep 9, 2025

Merge activity

  • Sep 9, 5:46 PM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Sep 9, 5:47 PM UTC: CI is running for this pull request on a draft pull request (#2896) due to your merge queue CI optimization settings.
  • Sep 9, 5:49 PM UTC: Merged by the Graphite merge queue via draft PR: #2896.

graphite-app bot pushed a commit that referenced this pull request Sep 9, 2025
@graphite-app graphite-app bot closed this Sep 9, 2025
@graphite-app graphite-app bot deleted the 09-05-chore_ups_add_message_chunking_ups_protocol branch September 9, 2025 17:49
This was referenced Sep 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants